public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; // 构造一个新的节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 首先获取 putLock 的锁,保证putLock的线程安全性 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ // 如果队列已满,则等待消费者调用 notFull.signal() while (count.get() == capacity) { notFull.await(); } // 入队 enqueue(node); // 原子加1 c = count.getAndIncrement(); // 如果此时队列没有满,调用 signal 更新 count if (c + 1 < capacity) notFull.signal(); } finally { // 释放锁 putLock.unlock(); } // 如果 c 为 0;这里的 c 为之前的值 // 也就是说,现在已经加1了,可以提示生产者 if (c == 0) signalNotEmpty(); }
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
/** * @author Create by xuantang * @date on 8/22/18 */ public class ReentrantLockDemo { private static ReentrantLock mLock = new ReentrantLock(); private static Condition mCondition = mLock.newCondition();
public static void main(String[] args) { new WaitThread("waiter one").start(); new WaitThread("waiter two").start(); new WaitThread("waiter three").start(); new NotifyThread("notify one").start(); }